package com.example.dobs.demo.flink.realtime.report.study.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Date;
import java.util.Properties;

public class DownLoadKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "test.test0912.adx.ads.sg1.mq:9092");
        kafkaProps.setProperty("group.id", "ads_adx_grtest0912");
        kafkaProps.setProperty("auto.offset.reset", "earliest");
        DataStreamSource<String> streamSource = env.addSource(new FlinkKafkaConsumer<>("ads_adx_test0912", new SimpleStringSchema(), kafkaProps));
        streamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return new Date().toString()+":  "+value;
            }
        }).print();
//        streamSource.addSink(new FlinkKafkaProducer011<>("ads_adx_test0910","ads_adx_test0912", new SimpleStringSchema())).name("xxx");
        env.execute();
    }
}
